Is producer.flush() a must? 路 Issue #137 路 confluentinc/confluent |
您所在的位置:网站首页 › kafka callback delay › Is producer.flush() a must? 路 Issue #137 路 confluentinc/confluent |
i have a producer code using callback . but i see my message not calling the callback or throwing the error even if no producer. always it show success. but if kafka is up then producing the message. But I need to raise the error even if the 1st message delivery fails. I could able to get the second message but not the first. can someone please help to understand the behaviour. or how to handle the 1st message failure below is my snippet import json from confluent_kafka import avro, KafkaError from confluent_kafka.avro import AvroProducer from confluent_kafka import Producer logger = get_logger() class SimpleProducer(config): .... def produce_with_callback(self, key=None, value=None, poll_timeout=305000): def delivery_callback(error, msg): if error: logger.error(f"Message failed delivery for {msg.value()} with {error}") raise KafkaError else: logger.debug(f"Message delivered to {msg.topic()},{msg.partition()}, {msg.offset()}") produce(topic=self._topic_name, key=key, value=value, callback=lambda error, message: delivery_callback(error,message)) # Poll time set to higher than message.timeout.ms poll(poll_timeout)configuration = {'linger.ms': 30, 'acks': 'All', 'request.timeout.ms': 10000, 'message.timeout.ms': 90000} count = 1 jproducer = SimpleProducer( producer_id="producer_id",topic_name="test",broker_host="localhost", broker_port='9092', configurations=configuration) print(f" producing {count}-- {datetime.datetime.now()}") try: jproducer.produce_with_callback(key='1',value=json.dumps(count), poll_timeout=130) except BufferError as e: print(f"Buffer error: {datetime.datetime.now()} \n {e}") except Exception as e: print(f"produce error: {datetime.datetime.now()} \n {e}") else: print(f"produced successfully {count}: {datetime.datetime.now()}") count+=1 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |